package org.databandtech.flinkstreaming;

import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class StateCountWindow extends RichFlatMapFunction<Tuple2<String, Long>, Map<String,Tuple2<Long, Long>>> {

	private static final long serialVersionUID = 1L;
	private transient ValueState<Map<String,Tuple2<Long, Long>>> sum;

    @Override
    public void flatMap(Tuple2<String, Long> input, Collector<Map<String,Tuple2<Long, Long>>> out) throws Exception {

        //获取状态
    	Map<String,Tuple2<Long, Long>> currentmap = sum.value();
    	
    	if (currentmap.containsKey(input.f0)) {
    		Tuple2<Long, Long> old = currentmap.get(input.f0);
    		currentmap.replace(input.f0, new Tuple2<Long, Long>(old.f0 +1 , old.f1+input.f1));
    	}else {
    		currentmap.put(input.f0, new Tuple2<Long, Long>(1L , input.f1));
    	}

        //状态更新
        sum.update(currentmap);
        out.collect(currentmap);//注意，state缓存和输出的类型是一致的

    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Map<String,Tuple2<Long, Long>>> descriptor =
        new ValueStateDescriptor<Map<String,Tuple2<Long, Long>>>(
                "countwindow", // state name
                TypeInformation.of(new TypeHint<Map<String,Tuple2<Long, Long>>>() {}), // type information
                new HashMap<String,Tuple2<Long, Long>>() ); // default value of the state, if nothing was set        
        sum = getRuntimeContext().getState(descriptor);
    }
}
